package kafka

import (
	"context"
	"github.com/Shopify/sarama"
	"sync"
)

// ConsumerHandler 需要实现这个接口
type ConsumerHandler interface {
	// SetupHook is run at the beginning of a new session, before ConsumeClaim.
	SetupHook()
	// CleanUpHook is run at the end of a session, once all ConsumeClaim goroutines have exited
	// but before the offsets are committed for the very last time
	CleanUpHook()
	// HandleMsg 这里做了转换，仅需要写单条消息的处理逻辑即可。循环和MarkMessage已经做了处理，不需要再考虑
	// 内部处理错误
	HandleMsg(msg *sarama.ConsumerMessage) error
}

type GroupConsumer struct {
	config  *sarama.Config
	setting ConsumerSetting
	group   sarama.ConsumerGroup
	once    sync.Once
}

type ConsumerSetting struct {
	Name    string   `mapstructure:"name"` //连接名字
	Hosts   []string `mapstructure:"hosts"`
	Topic   string   `mapstructure:"topic"`
	GroupId string   `mapstructure:"group_id"`
	// Offset 值只能为-1或-2，-1代表无偏移量时从最新位置开始消费，-2代表无偏移量时从最老的位置开始消费
	Offset int64 `mapstructure:"offset"`
	// AutoCommit 默认自动提交
	AutoCommit        bool `mapstructure:"auto_commit"`
	MaxRetry          int  `mapstructure:"max_retry"`
	MaxRetryHandleMsg int  `mapstructure:"max_retry_handle_msg"`
	ReturnError       bool `mapstructure:"return_error"`
	ErrorCallback     func(err error)
}

func NewConsumer(setting ConsumerSetting) (*GroupConsumer, error) {
	config := buildConsumerConfig(setting)
	return &GroupConsumer{config: config, setting: setting}, nil
}

// StartGroupConsume 阻塞执行
func (consumer *GroupConsumer) StartGroupConsume(handler ConsumerHandler) error {

	group, err := sarama.NewConsumerGroup(consumer.setting.Hosts, consumer.setting.GroupId, consumer.config)
	if err != nil {
		return err
	}

	consumer.group = group

	// handle errors
	if consumer.setting.ReturnError == true {
		consumer.once.Do(func() {
			go func() {
				for err := range group.Errors() {
					if err != nil && consumer.setting.ErrorCallback != nil {
						consumer.setting.ErrorCallback(err)
					}
				}
			}()
		})
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	var handlerImpl = consumerHandlerImpl{
		exposeHandler:     handler,
		autoCommit:        consumer.setting.AutoCommit,
		maxRetryHandleMsg: consumer.setting.MaxRetryHandleMsg,
	}

	for {
		err = group.Consume(ctx, []string{consumer.setting.Topic}, &handlerImpl)
		if err != nil && consumer.setting.ErrorCallback != nil {
			consumer.setting.ErrorCallback(err)
		}
		if ctx.Err() != nil {
			return ctx.Err()
		}
	}

}

func (consumer *GroupConsumer) Close() {
	_ = consumer.group.Close()
}

func buildConsumerConfig(setting ConsumerSetting) *sarama.Config {
	config := sarama.NewConfig()
	if setting.Offset == -1 || setting.Offset == -2 {
		config.Consumer.Offsets.Initial = setting.Offset
	} else {
		config.Consumer.Offsets.Initial = -1
	}
	if setting.MaxRetry != 0 {
		config.Consumer.Offsets.Retry.Max = setting.MaxRetry
	}
	config.Consumer.Offsets.AutoCommit.Enable = setting.AutoCommit
	config.Consumer.Return.Errors = setting.ReturnError
	return config
}

type consumerHandlerImpl struct {
	exposeHandler     ConsumerHandler
	autoCommit        bool
	maxRetryHandleMsg int
}

func (c *consumerHandlerImpl) Setup(sess sarama.ConsumerGroupSession) error {
	c.exposeHandler.SetupHook()
	return nil
}

func (c *consumerHandlerImpl) Cleanup(sess sarama.ConsumerGroupSession) error {
	c.exposeHandler.CleanUpHook()
	return nil
}

func (c *consumerHandlerImpl) ConsumeClaim(sess sarama.ConsumerGroupSession, groupClaim sarama.ConsumerGroupClaim) (err error) {
	for msg := range groupClaim.Messages() {
		// 由下游自行决定重试次数
		err = c.exposeHandler.HandleMsg(msg)
		// 标记消息已被消费 内部会更新 consumer offset
		sess.MarkMessage(msg, "")
		// 当手动提交时:
		if c.autoCommit != true {
			sess.Commit()
		}
	}
	return nil
}
